/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.metron.storm.kafka.flux;

import com.google.common.base.Joiner;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.metron.common.utils.KafkaUtils;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.OutputFieldsGetter;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
 * This is a convenience layer on top of the KafkaSpoutConfig.Builder available in storm-kafka-client.
 * The justification for this class is two-fold.  First, there are a lot of moving parts and a simplified
 * approach to constructing spouts is useful.  Secondly, and perhaps more importantly, the Builder pattern
 * is decidedly unfriendly to use inside of Flux.  Finally, we can make things a bit more friendly by only requiring
 * zookeeper and automatically figuring out the brokers for the bootstrap server.
 *
 * @param <K> The kafka key type
 * @param <V> The kafka value type
 */
public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V> {
  final static String STREAM = "default";

  /**
   * The fields exposed by the kafka consumer.  These will show up in the Storm tuple.
   */
  public enum FieldsConfiguration {
    KEY("key", record -> record.key()),
    VALUE("value", record -> record.value()),
    PARTITION("partition", record -> record.partition()),
    TOPIC("topic", record -> record.topic())
    ;
    String fieldName;
    Function<ConsumerRecord,Object> recordExtractor;

    FieldsConfiguration(String fieldName, Function<ConsumerRecord,Object> recordExtractor) {
      this.recordExtractor = recordExtractor;
      this.fieldName = fieldName;
    }

    /**
     * Return a list of the enums
     * @param configs
     * @return
     */
    public static List<FieldsConfiguration> toList(String... configs) {
      List<FieldsConfiguration> ret = new ArrayList<>();
      for(String config : configs) {
        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
      }
      return ret;
    }

    /**
     * Return a list of the enums from their string representation.
     * @param configs
     * @return
     */
    public static List<FieldsConfiguration> toList(List<String> configs) {
      List<FieldsConfiguration> ret = new ArrayList<>();
      for(String config : configs) {
        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
      }
      return ret;
    }

    /**
     * Construct a Fields object from an iterable of enums.  These fields are the fields
     * exposed in the Storm tuple emitted from the spout.
     * @param configs
     * @return
     */
    public static Fields getFields(Iterable<FieldsConfiguration> configs) {
      List<String> fields = new ArrayList<>();
      for(FieldsConfiguration config : configs) {
        fields.add(config.fieldName);
      }
      return new Fields(fields);
    }
  }

  /**
   * Build a tuple given the fields and the topic.  We want to use our FieldsConfiguration enum
   * to define what this tuple looks like.
   * @param <K> The key type in kafka
   * @param <V> The value type in kafka
   */
  public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
    private List<FieldsConfiguration> configurations;
    private TupleBuilder(String topic, List<FieldsConfiguration> configurations) {
      super(topic);
      this.configurations = configurations;
    }

    /**
     * Builds a list of tuples using the ConsumerRecord specified as parameter
     *
     * @param consumerRecord whose contents are used to build tuples
     * @return list of tuples
     */
    @Override
    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
      Values ret = new Values();
      for(FieldsConfiguration config : configurations) {
        ret.add(config.recordExtractor.apply(consumerRecord));
      }
      return ret;
    }
  }

  private String topic;

  /**
   * Create an object with the specified properties.  This will expose fields "key" and "value."
   * @param kafkaProps The special kafka properties
   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns.
   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from this.
   */
  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
                                , String topic
                                , String zkQuorum
                                )
  {
    this(kafkaProps, topic, zkQuorum, Arrays.asList("key", "value"));
  }

  /**
   * Create an object with the specified properties and exposing the specified fields.
   * @param kafkaProps The special kafka properties
   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns.
   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from this.
   * @param fieldsConfiguration The fields to expose in the storm tuple emitted.
   */
  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
                                , String topic
                                , String zkQuorum
                                , List<String> fieldsConfiguration
                                )
  {
    super( modifyKafkaProps(kafkaProps, zkQuorum)
         , createStreams(fieldsConfiguration, topic)
         , createTuplesBuilder(fieldsConfiguration, topic)
         );
    this.topic = topic;
  }

  /**
   * Get the kafka topic.  TODO: In the future, support multiple topics and regex patterns.
   * @return
   */
  public String getTopic() {
    return topic;
  }

  /**
   * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields.  Also, configure the spout
   * using a Map that configures both kafka as well as the spout (see the properties in SpoutConfiguration).
   * @param topic
   * @param zkQuorum
   * @param fieldsConfiguration
   * @param kafkaProps  The aforementioned map.
   * @return
   */
  public static <K, V> StormKafkaSpout<K, V> create( String topic
                                                   , String zkQuorum
                                                   , List<String> fieldsConfiguration
                                                   , Map<String, Object> kafkaProps
  )
  {
    Map<String, Object> spoutConfig = SpoutConfiguration.separate(kafkaProps);

    SimpleStormKafkaBuilder<K, V> builder = new SimpleStormKafkaBuilder<>(kafkaProps, topic, zkQuorum, fieldsConfiguration);
    SpoutConfiguration.configure(builder, spoutConfig);
    return new StormKafkaSpout<>(builder);
  }

  private static Map<String, Object> modifyKafkaProps(Map<String, Object> props, String zkQuorum) {
    try {
      if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) {
        //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's absent.
        List<String> brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum);
        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers));
      }
      props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
      props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());

    } catch (Exception e) {
      throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(), e);
    }
    return props;
  }

  private static <K,V> KafkaSpoutTuplesBuilder<K, V> createTuplesBuilder(List<String> config, String topic) {
    TupleBuilder<K, V> tb =  new TupleBuilder<K, V>(topic, FieldsConfiguration.toList(config));
    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build();
  }


  private static KafkaSpoutStreams createStreams(List<String> config, String topic) {
    final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config));
    return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic} ).build();
  }

}
